"""
参考：https://stackoverflow.com/a/44786242
"""

import logging
from ctypes import (ARRAY, CDLL, POINTER, Structure, c_char, c_int, c_short,
                    c_size_t, c_ssize_t, c_void_p)

LOG = logging.getLogger(__name__)
__all__ = 'is_remote_alive',


class pollfd(Structure):
    _fields_ = (
        ('fd', c_int),
        ('events', c_short),
        ('revents', c_short),
    )


MSG_DONTWAIT = 0x40
MSG_PEEK = 0x02

EPOLLIN = 0x001
EPOLLPRI = 0x002
EPOLLRDNORM = 0x040

libc = CDLL(None)

recv = libc.recv
recv.restype = c_ssize_t
recv.argtypes = c_int, c_void_p, c_size_t, c_int

poll = libc.poll
poll.restype = c_int
poll.argtypes = POINTER(pollfd), c_int, c_int


class IsRemoteAlive:  # not needed, only for debugging
    def __init__(self, alive, msg):
        self.alive = alive
        self.msg = msg

    def __str__(self):
        return self.msg

    def __repr__(self):
        return '%s(%r,%r)' % (self.__class__.__name__, self.alive, self.msg)

    def __bool__(self):
        return self.alive


def is_remote_alive(fd):
    fileno = getattr(fd, 'fileno', None)
    if fileno is not None:
        if hasattr(fileno, '__call__'):
            fd = fileno()
        else:
            fd = fileno

    p = pollfd(fd=fd, events=EPOLLIN | EPOLLPRI | EPOLLRDNORM, revents=0)
    result = poll(p, 1, 0)
    LOG.debug('poll: %r', result)
    if not result:
        return IsRemoteAlive(True, 'empty')

    buf = ARRAY(c_char, 1)()
    result = recv(fd, buf, len(buf), MSG_DONTWAIT | MSG_PEEK)
    # MSG_DONTWAIT: 非阻塞
    # MSG_PEEK: 仅读取，不从 buffer 中清除
    if result > 0:
        return IsRemoteAlive(True, 'readable')
    elif result == 0:
        return IsRemoteAlive(False, 'closed')
    else:
        return IsRemoteAlive(False, 'errored')
